# -*- coding: utf-8 -*-
"""
Copyright (c) 2020, Nimblex Co .,Ltd.

Created on 2020-12-10 15:42
"""
import threading
import sys

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
import sqlalchemy.engine.url as url


Base = declarative_base()


class Counter(Base):
    __tablename__ = 'counters'

    id = Column(Integer, primary_key=True)
    counter = Column(Integer)


engine = None


def init_db_engine(host, port, dbname, user, passwd):
    global engine
    uri = url.URL(
        drivername="postgresql",
        host=host,
        port=port,
        username=user,
        password=passwd,
        database=dbname,
    )
    engine = create_engine(uri, encoding='utf-8', echo=False,
                           pool_size=100, pool_recycle=3600,
                           pool_pre_ping=True)


def get_session():
    global engine
    SessionCls = sessionmaker(bind=engine)
    return SessionCls()


def test_transaction_try_again():
    def incrInTx():
        session = get_session()
        try:
            cnt = session.query(Counter).first()
            cnt.counter = cnt.counter + 1
            session.add(cnt)
            session.commit()
        finally:
            session.rollback()
            session.close()

    def retryer():
        while 1:
            try:
                incrInTx()
            except OperationalError as e:
                if ('Try again' in e.message or '40001' in e.message or 'Restart read required' in e.message):
                    print('try again')
                    continue

                # 其他异常
                raise e

            return

    threads = []
    for x in range(10):
        t = threading.Thread(target=retryer)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()


def main():
    host = sys.argv[1]
    port = int(sys.argv[2])

    init_db_engine(host, port, 'test', 'test', 'test')
    Base.metadata.create_all(engine)

    session = get_session()
    session.execute('delete from counters')
    cnt = Counter(counter=1)
    session.add(cnt)
    session.commit()
    session.close()

    test_transaction_try_again()


if __name__ == '__main__':
    main()
